案例丨详解当当网的分布式作业框架elastic-job
张亮,当当网架构师,当当技术委员会成员。对架构设计、消息中间件、分布式等领域兴趣浓厚。 目前主导当当应用框架ddframe研发和推广以及技术白皮书撰写。其中ddframe的分布式作业部分elastic-job已经正式开源。
作业的必要性以及存在的问题
1. 为什么需要作业?
作业即定时任务。一般来说,系统可使用消息传递代替部分使用作业的场景。两者确有相似之处。可互相替换的场景,如队列表。将待处理的数据放入队列表,然后使用频率极短的定时任务拉取队列表的数据并处理。这种情况使用消息中间件的推送模式可更好的处理实时性数据。而且基于数据库的消息存储吞吐量远远小于基于文件的顺序追加消息存储。
但在某些场景下则不能互换:
a) 时间驱动OR事件驱动:内部系统一般可以通过事件来驱动,但涉及到外部系统,则只能使用时间驱动。如:抓取外部系统价格。每小时抓取,由于是外部系统,不能像内部系统一样发送事件触发事件。
b) 批量处理OR逐条处理:批量处理堆积的数据更加高效,在不需要实时性的情况下比消息中间件更有优势。而且有的业务逻辑只能批量处理,如:电商公司与快递公司结算,一个月结算一次,并且根据送货的数量有提成。比如,当月送货超过1000则额外给快递公司多1%的快递费。
c) 非实时性OR实时性:虽然消息中间件可以做到实时处理数据,但有的情况并不需要。如:VIP用户降级,如果超过1年无购买行为,则自动降级。这类需求没有强烈的时间要求,不需要按照时间精确的降级VIP用户。
d) 系统内部OR系统解耦:作业一般封装在系统内部,而消息中间件可用于系统间解耦。
2. 当前常见的作业系统存在哪些问题?
a) Quartz:Java事实上的定时任务标准。但Quartz关注点在于定时任务而非数据,并无一套根据数据处理而定制化的流程。虽然Quartz可以基于数据库实现作业的高可用,但缺少分布式并行调度的功能。
b) TBSchedule:阿里早期开源的分布式任务调度系统。代码略陈旧,使用timer而非线程池执行任务调度。众所周知,timer在处理异常状况时是有缺陷的。而且TBSchedule作业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重。
c) Crontab:Linux系统级的定时任务执行器。缺乏分布式和集中管理功能。
综上所述,当前存在的作业系统缺少分布式、并行调度、弹性扩容缩容、集中管理、定制化流程型任务等功能,所以需要一个新的作业系统完善这些功能。
解决思路
修改开源产品和基于开源产品重新搭建,是两种可行性较高的方案。
修改开源产品可控性较低,且一般都是针对于某些特定功能,所以我们采用将成熟的开源产品作为积木,在其之上重新封装,搭建一个崭新的产品,并命名为elastic-job。
elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。
1. 主要功能
a) 分布式:重写Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心。
b) 并行调度:采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。
c) 弹性扩容缩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。
d) 集中管理:采用基于Zookeeper的注册中心,集中管理和协调分布式作业的状态,分配和监听。外部系统可直接根据Zookeeper的数据管理和监控elastic-job。
e) 定制化流程型任务:作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性,这点类似于kafka的分区顺序性。
2. 其他功能
a) 失效转移:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。
b) Spring命名空间支持:elastic-job可以不依赖于spring直接运行,但是也提供了自定义的命名空间方便与spring集成。
c) 运维平台:提供web控制台用于管理作业。
3. 非功能需求
a) 稳定性:在服务器无波动的情况下,并不会重新分片;即使服务器有波动,下次分片的结果也会根据服务器IP和作业名称哈希值算出稳定的分片顺序,尽量不做大的变动。
b) 高性能:同一服务器的批量数据处理采用自动切割并多线程并行处理。
c) 灵活性:所有在功能和性能之间的权衡,都可通过配置开启/关闭。如:elastic-job会将作业运行状态的必要信息更新到注册中心。如果作业执行频度很高,会造成大量Zookeeper写操作,而分布式Zookeeper同步数据可能引起网络风暴。因此为了考虑性能问题,可以牺牲一些功能,而换取性能的提升。
d) 幂等性:elastic-job可牺牲部分性能用以保证同一分片项不会同时在两个服务器上运行。
e) 容错性:作业服务器和Zookeeper断开连接则立即停止作业运行,用于防止分片已经重新分配,而脑裂的服务器仍在继续执行,导致重复执行。
实现方案及开发理念
1. elastic-job的具体模块的底层及如何实现
elastic-job采用去中心化设计,主要分为注册中心,数据分片,分布式协调,定时任务处理和定制化流程型任务等模块。
a) 去中心化
去中心化指elastic-job并无调度中心这一概念,每个运行在集群中的作业服务器都是对等的,节点之间通过注册中心进行分布式协调。但elastic-job有主节点的概念,主节点用于处理一些集中式任务,如分片,清理运行时信息等,并无调度功能,定时调度都是由作业服务器自行触发。
下面对比一下各自的优缺点:
中心化 | 去中心化 | |
实现难度 | 高 | 低 |
部署难度 | 高 | 低 |
触发时间统一控制 | 可以 | 不可以 |
触发延迟 | 有 | 无 |
异构语言支持 | 容易 | 困难 |
b) 注册中心
注册中心模块目前直接使用zookeeper,用于记录作业的配置,服务器信息以及作业运行状态。Zookeeper虽然很成熟,但原理复杂,使用较难,在海量数据支持的情况下也会有性能和网络问题。目前elastic-job已经抽象出注册中心的接口,下一步将会考虑支持多注册中心,如etcd,或由用户自行实现注册中心。无临时节点和监听机制的注册中心需要自行实现定时心跳监测等功能。
c) 数据分片
数据分片是elastic-job中实现分布式的重要概念,将真实数据和逻辑分片对应,用于解耦作业框架和数据的关系。作业框架只负责将分片合理的分配给相关的作业服务器,而作业服务器需要根据所分配的分片匹配数据进行处理。服务器分片目前都存储在注册中心中,各个服务器根据自己的IP地址拉取分片。
d) 分布式协调
分布式协调模块用于处理作业服务器的动态扩容缩容。一旦集群中有服务器发生变化,分布式协调将自动监测并将变化结果通知仍存活的作业服务器。协调时将会涉及主节点选举,重分片等操作。目前使用的Zookeeper的临时节点和监听器实现主动检查和通知功能。
e) 定时任务处理
定时任务处理根据cron表达式定时触发任务,目前有防止任务同时触发,错过任务重出发等功能。主要还是使用Quartz本身的定时调度功能,为了便于控制,每个任务都使用独立的线程池。
f) 定制化流程型任务
定制化流程型任务将定时任务分为多种流程,有不经任何修饰的简单任务;有用于处理数据的fetchData/processData的数据流任务;以后还将增加消息流任务,文件任务,工作流任务等。用户能以插件的形式扩展并贡献代码。
2. 部署和使用
将使用elastic-job框架的jar/war连接同一个基于Zookeeper的注册中心即可。
3. 对开源产品的开发理念
elastic-job的开源主要是为了反馈社区。开源短短两个月,我们收到了很多朋友的反馈和支持,非常感谢。技术类开源项目和一般的业务型项目不同,更需要对代码和质量的控制,我们总结出以下几点:
a) 用心写代码,用代码讲故事。代码是项目的唯一核心和产出,任何一行的代码都需要用心思考优雅性,可读性,合理性。
a) 代码整洁干净到极致。只有代码漂亮整洁,其他开源爱好者才愿意阅读代码,进而找出项目中的bug和贡献高质量代码。
b) 极简代码, 高度复用,无重复代码和配置。Java生态圈的特点是高质量的开源产品极多。我们尽量考虑复用轮子,比如项目中大量用到lombok简化代码;但也不会无原则的使用开源产品,我们倾向于把开源产品分为积木类和大厦类。项目中一般只考虑使用积木类搭建属于我们自己的大厦,而不会直接用其他已成型的大厦。
c) 单一需求可不考虑扩展性;两个类似需求时再提炼。
d) 模块抽象划分合理。
e) 如无特殊理由, 测试需全覆盖。elastic-job核心模块的测试覆盖率是95%以上。
f) 对质量的定义。代码可读性 > 代码可测性 > 模块解耦设计 > 功能正确性 > 性能 > 功能可扩展性。只有代码可读,可测试,可100%掌控,项目才可持续发展。功能有缺陷可以修复,性能不够可以优化,而代码不清晰则项目会渐渐变为黑盒。所以对于框架类产品,我们认为质量 > 时间 > 成本。
g) 文档清晰。
未来展望
目前的elastic-job定位是一个基于java的定时任务调度框架,未来想发展成为支持异构语言,高度灵活,可自定制的定时任务调度产品。
a) 异构语言支持。目前采用的无中心设计,难于支持多语言,考虑调度中心的可行性。
b) 监控体系有待提高,目前只能通过注册中心做简单的存活和数据积压监控。未来需要做的监控部分有:
增加可监控维度,如作业运行时间等。
基于JMX的内部状态监控。
基于历史的全量数据监控,将所有监控数据通过flume等形式发到外部监控中心,提供实时分析功能。
c) 多种注册中心支持。
c) 增加任务工作流,如任务依赖,初始化任务,清理任务等。
d) 失效转移功能的实时性提升。
e) 更多作业类型支持,如文件,MQ等类型作业的支持。
f) 更多分片策略支持。
附录:elastic-job的来历
elastic-job原本是当当java应用框架ddframe的一部分,本名dd-job。
ddframe包括编码规范,开发框架,技术规范,监控以及分布式组件。
当当希望将ddframe的各个模块与公司环境解耦并开源以反馈社区。之前开源的Dubbo扩展版本DubboX即是dd-soa的核心模块。而本次介绍的elastic-job则是dd-job的开源部分,其中监控(但开源了监控方法)和ddframe核心接入等部分并未开源。
项目开源地址戳“阅读原文”👇